/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.mapreduce;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Reducer class for the bulkload jobs.
 */
public class FormatToKeyValueReducer
  extends Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue> {

  protected static final Logger LOGGER = LoggerFactory.getLogger(FormatToKeyValueReducer.class);

  protected List<String> tableNames;
  protected List<String> logicalNames;
  protected KeyValueBuilder builder;
  private Map<Integer, Pair<byte[], byte[]>> columnIndexes;

  @Override
  protected void setup(Context context) throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();

    // pass client configuration into driver
    Properties clientInfos = new Properties();
    for (Map.Entry<String, String> entry : conf) {
      clientInfos.setProperty(entry.getKey(), entry.getValue());
    }
    try (PhoenixConnection conn =
      (PhoenixConnection) QueryUtil.getConnectionOnServer(clientInfos, conf)) {
      builder = conn.getKeyValueBuilder();
      final String tableNamesConf = conf.get(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY);
      final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
      tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
      logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
      initColumnsMap(conn);
    } catch (SQLException e) {
      throw new RuntimeException(e);
    }
  }

  private void initColumnsMap(PhoenixConnection conn) throws SQLException {
    Map<byte[], Integer> indexMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    columnIndexes = new HashMap<>();
    int columnIndex = 0;
    for (int index = 0; index < logicalNames.size(); index++) {
      PTable table = conn.getTable(logicalNames.get(index));
      if (!table.getImmutableStorageScheme().equals(ImmutableStorageScheme.ONE_CELL_PER_COLUMN)) {
        List<PColumnFamily> cfs = table.getColumnFamilies();
        for (int i = 0; i < cfs.size(); i++) {
          byte[] family = cfs.get(i).getName().getBytes();
          Pair<byte[], byte[]> pair =
            new Pair<>(family, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES);
          columnIndexes.put(new Integer(columnIndex), pair);
          columnIndex++;
        }
      } else {
        List<PColumn> cls = table.getColumns();
        for (int i = 0; i < cls.size(); i++) {
          PColumn c = cls.get(i);
          byte[] family = new byte[0];
          byte[] cq;
          if (!SchemaUtil.isPKColumn(c)) {
            family = c.getFamilyName().getBytes();
            cq = c.getColumnQualifierBytes();
          } else {
            cq = c.getName().getBytes();
          }
          byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
          Pair<byte[], byte[]> pair = new Pair<>(family, cq);
          if (!indexMap.containsKey(cfn)) {
            indexMap.put(cfn, new Integer(columnIndex));
            columnIndexes.put(new Integer(columnIndex), pair);
            columnIndex++;
          }
        }
      }
      byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
      byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
      Pair<byte[], byte[]> pair = new Pair<>(emptyColumnFamily, emptyKeyValue);
      columnIndexes.put(new Integer(columnIndex), pair);
      columnIndex++;
    }
  }

  @Override
  protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values,
    Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
    throws IOException, InterruptedException {
    TreeSet<KeyValue> map = new TreeSet<KeyValue>(CellComparatorImpl.COMPARATOR);
    for (ImmutableBytesWritable aggregatedArray : values) {
      DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
      while (input.available() != 0) {
        byte type = input.readByte();
        long timestamp = WritableUtils.readVLong(input);
        int index = WritableUtils.readVInt(input);
        ImmutableBytesWritable family;
        ImmutableBytesWritable cq;
        ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
        Pair<byte[], byte[]> pair = columnIndexes.get(index);
        family = new ImmutableBytesWritable(pair.getFirst());
        cq = new ImmutableBytesWritable(pair.getSecond());
        int len = WritableUtils.readVInt(input);
        if (len > 0) {
          byte[] array = new byte[len];
          input.read(array);
          value = new ImmutableBytesWritable(array);
        }
        KeyValue kv;
        KeyValue.Type kvType = KeyValue.Type.codeToType(type);
        switch (kvType) {
          case Put: // not null value
            kv = builder.buildPut(key.getRowkey(), family, cq, timestamp, value);
            break;
          case DeleteColumn: // null value
            kv = builder.buildDeleteColumns(key.getRowkey(), family, cq, timestamp);
            break;
          default:
            throw new IOException("Unsupported KeyValue type " + kvType);
        }
        map.add(kv);
      }
      Closeables.closeQuietly(input);
    }
    context.setStatus("Read " + map.getClass());
    int index = 0;
    for (KeyValue kv : map) {
      context.write(key, kv);
      if (++index % 100 == 0) context.setStatus("Wrote " + index);
    }
  }
}
